package com.niit.rdd

import org.apache.spark.{SparkConf, SparkContext}

object SparkRDD_Transform_03 {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    /*
    reduceByKey:相同的key的数据进行value数据聚合
     */
    val rdd1 =  sc.makeRDD(List( ("a",1),("a",2) ,("b",3) ,("b",4),("c",5) ) )
    //相同的key的值进行相加,key是不参与运算的..其中key 只有一个值是不参与运算的
    val redRdd =  rdd1.reduceByKey( (x:Int,y:Int) =>{//a: x=1 y =2  b: x=3  y=4
      println(s"x=${x},y=${y}")
      x+y
    })
    redRdd.collect().foreach(println)

    /*
    groupByKey: 将数据源的数据根据 key 对 value 进行分组
     */
    val rdd2 =  sc.makeRDD(List( ("a",1),("a",2) ,("b",3) ,("b",4),("c",5) ) )
    val groupRdd =  rdd2.groupByKey()
    groupRdd.collect().foreach(println)

    /*
    aggregateByKey:将数据根据不同的规则进行分区内计算和分区间计算
     */
    val rdd3 =  sc.makeRDD(List( ("a",1),("a",2) ,("a",3) ,("a",4) ),2 )
    /*
      aggregateByKey存在柯里化，有两个参数列表
      第一个参数列表：需要传递一个参数，表示初始值
         主要用于当碰见第一个key的时候，和value进行分区内计算
      第二个参数列表，需要传递两个参数
        第一个参数表示分区内计算规则
        第二个参数表示分区间计算规则

        【 （a,1） (a,2) 】   【  （a,3） (a,4) 】
   第一步：      (a,2)      +           (a,4)
   第二步：               (a,6)
     */
    //找到分区内的最大值 和 分区间的 和
    println("-------------------------------------")
    rdd3.aggregateByKey(0) (
      (x,y) => math.max(x,y),
      (x,y) => x+y).collect().foreach(println)

    val rdd4 =  sc.makeRDD(List( ("a",1),("a",2) ,("b",3) ,
                                 ("b",4),("b",5),("a",6) ),2 )
    /*
        【 ("a",1),("a",2) ,("b",3) 】 【 ("b",4),("b",5),("a",6)  】
       第一步：    ("a",2)   ("b",3)   +   ("b",5)     ("a",6)
       第二步       （a,2）+ (a,6)          (b,3)+(b,5)
       第三步：         (a,8)                 (b,8)
     */
    println("-------------------------------------")
    rdd4.aggregateByKey(0) (
      (x,y) => math.max(x,y),
      (x,y) => x+y).collect().foreach(println)
    println("-------------------------------------")
    /*
     【 ("a",1),("a",2) ,("b",3) 】 【 ("b",4),("b",5),("a",6)  】
    第一步：  【("a",3) ("b",3)】    【 ("b",9)，("a",6)】
    第二步：   ("a",3)+("a",6)         ("b",3) +  ("b",9)
    第三步：        (a,9)                   (b,12)
     */

    rdd4.aggregateByKey(0) (
      (x,y) => x + y,
      (x,y) => x+y).collect().foreach(println)

    //简易版，如果分区内 和 分区间的计算规则是相同的，就可以用下划线来取代了
    rdd4.aggregateByKey(0) (_ + _, _+ _).collect().foreach(println)
    //===> 升级版：当分区内和分区间计算规则相同时候，可以使用foldByKey
    rdd4.foldByKey(0)( _+_ ).collect().foreach(println)

    /*
    sortByKey:在一个(K,V)的 RDD 上调用，K 必须实现 Ordered 接口(特质)，返回一个按照 key 进行排
序的
     */
    val rdd5 = sc.makeRDD(List(("a",1),("c",3),("b",2)) )
    val sortRdd =  rdd5.sortByKey(false)
    sortRdd.collect().foreach(println)

    /*
     join :
     在类型为(K,V)和(K,W)的 RDD 上调用，返回一个相同 key 对应的所有元素连接在一起的
      (K,(V,W))的 RDD
     */
    println("----------------------------------")
    val rdd6 = sc.makeRDD( List( ("a",1),("a",2), ("c",3),("b",7) ) )
    val rdd7 = sc.makeRDD( List( ("a",5),("c",6), ("a",4),("d",8) ) )
    // join : 两个不同数据源的数据，相同的key的value会连接在一起，形成元组
    // 如果两个数据源中key没有匹配上，那么数据不会出现在结果中
    // 如果两个数据源中key有多个相同的，会依次匹配，可能会出现笛卡尔乘积，数据量会几何性增长，会导致性能降低。
    val joinRdd =  rdd6.join(rdd7)
    joinRdd.collect().foreach(println)

    /*
    . leftOuterJoin / rightOuterJoin :类似于 SQL 语句的左【右】外连接
     */
    println("----------------------------------")
    val joinRdd2 = rdd6.leftOuterJoin(rdd7)
    joinRdd2.collect().foreach(println)
    println("----------------------------------")
    val joinRdd3 = rdd6.rightOuterJoin(rdd7)
    joinRdd3.collect().foreach(println)
    //关闭资源
    sc.stop()
  }

}
